package cn.lingyangwl.framework.mqtt.config;

import cn.lingyangwl.framework.tool.core.UUIDUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * @author shenguangyang
 */
@Configuration
public class MqttProducerConfig {
    @Resource
    private MqttProperties mqttProperties;

    private final MqttConnectConfig mqttConnectConfig;

    /**
     * 发布的bean名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

    // 客户端与服务器之间的连接意外中断，服务器将发布客户端的“遗嘱”消息
    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }

    public MqttProducerConfig() {
        mqttConnectConfig = new MqttConnectConfig();
    }

    /**
     * 获取连接参数
     */
    @Bean
    public MqttConnectOptions getProducerMqttConnectOptions() {
        return mqttConnectConfig.getMqttConnectOptions(mqttProperties);
    }


    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory producerMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getProducerMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道（生产者）
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器（生产者）
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        String clientId = mqttProperties.getConsumer().getClientId() + "-" + UUIDUtils.randomUUID();
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId,
                producerMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getProducer().getDefaultTopic());
        return messageHandler;
    }

}
